package drds.plus.executor.cursor.cursor.impl.merge;

import drds.plus.executor.ExecuteContext;
import drds.plus.executor.cursor.cursor.IMergeCursor;
import drds.plus.executor.cursor.cursor.ISortingCursor;
import drds.plus.executor.cursor.cursor.impl.DuplicateValueRowDataLinkedList;
import drds.plus.executor.cursor.cursor.impl.SortingCursor;
import drds.plus.executor.data_node_executor.DataNodeExecutorContext;
import drds.plus.executor.record_codec.record.KeyValueRecordPair;
import drds.plus.executor.record_codec.record.Record;
import drds.plus.executor.row_values.RowValues;
import drds.plus.sql_process.abstract_syntax_tree.configuration.ColumnMetaData;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.ExecutePlan;
import drds.plus.sql_process.abstract_syntax_tree.expression.order_by.OrderBy;
import drds.plus.util.GeneralUtil;
import drds.tools.$;

import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * 针对每个cursor挨着取值 不能后退和跳跃
 */
public class ConcurrentMergeCursor extends SortingCursor implements IMergeCursor {

    protected final ExecuteContext executeContext;
    protected List<ISortingCursor> sortingCursorList = new ArrayList();
    protected int currentIndex = 0;
    List<RuntimeException> exceptionsWhenCloseSubCursor = new ArrayList();
    private List<ExecutePlan> executePlanList;
    private RowValues current;

    public ConcurrentMergeCursor(ExecuteContext executeContext, List<ExecutePlan> executePlanList) {
        super(null, null, null);
        this.executePlanList = executePlanList;
        this.executeContext = executeContext;
    }

    public void init() throws RuntimeException {
        if (this.inited) {
            return;
        }

        List<Future<ISortingCursor>> futureList = new LinkedList<Future<ISortingCursor>>();
        for (ExecutePlan executePlan : executePlanList) {
            futureList.add(DataNodeExecutorContext.getExecutorContext().getDataNodeExecutor().executeWithFuture(executeContext, executePlan));
        }
        List<RuntimeException> RuntimeExceptionList = new ArrayList();
        for (Future<ISortingCursor> future : futureList) {
            try {
                sortingCursorList.add(future.get(15, TimeUnit.MINUTES));
            } catch (Throwable e) {
                RuntimeExceptionList.add(new RuntimeException(e));
            }
        }
        if ($.isNotNullAndHasElement(RuntimeExceptionList)) {
            throw RuntimeExceptionList.get(0);
        }
        currentIndex = 0;
        super.init();
    }

    public void beforeFirst() throws RuntimeException {
        this.currentIndex = 0;
        this.current = null;
        for (int i = 0; i < sortingCursorList.size(); i++) {
            sortingCursorList.get(i).beforeFirst();
        }
    }

    public RowValues first() throws RuntimeException {
        throw new UnsupportedOperationException();
    }

    public RowValues next() throws RuntimeException {
        init();
        RowValues rowData = innerNext();
        return rowData;
    }

    private RowValues innerNext() throws RuntimeException {
        init();
        RowValues rowData;
        while (true) {
            if (currentIndex >= sortingCursorList.size()) {// 取尽所有cursor.
                return null;
            }
            ISortingCursor sortingCursor = sortingCursorList.get(currentIndex);
            rowData = sortingCursor.next();
            if (rowData != null) {
                return rowData;
            } else {
                sortingCursorList.get(currentIndex).close(exceptionsWhenCloseSubCursor);
                currentIndex++;
            }
        }
    }

    public RowValues current() throws RuntimeException {
        return this.current;
    }

    public List<RuntimeException> close(List<RuntimeException> exs) {
        exs.addAll(exceptionsWhenCloseSubCursor);
        for (ISortingCursor cursor : sortingCursorList) {
            exs = cursor.close(exs);
        }
        return exs;
    }

    public boolean skipTo(Record keyRecord) throws RuntimeException {
        throw new UnsupportedOperationException("skip to is not supported");
    }

    public List<ISortingCursor> getSortingCursorList() {
        return sortingCursorList;
    }

    public List<OrderBy> getOrderByList() throws RuntimeException {
        init();
        return this.sortingCursorList.get(0).getOrderByList();
    }


    public boolean skipTo(KeyValueRecordPair keyKeyValueRecordPair) throws RuntimeException {
        throw new UnsupportedOperationException();
    }


    public RowValues prev() throws RuntimeException {
        throw new UnsupportedOperationException();
    }


    public RowValues last() throws RuntimeException {
        throw new UnsupportedOperationException();
    }

    public boolean delete() throws RuntimeException {
        throw new UnsupportedOperationException();
    }

    public RowValues getNextDuplicateValueRowData() throws RuntimeException {
        throw new UnsupportedOperationException();
    }

    public void put(Record key, Record value) throws RuntimeException {
        throw new UnsupportedOperationException();

    }

    public Map<Record, DuplicateValueRowDataLinkedList> mgetRecordToDuplicateValueRowDataLinkedListMap(List<Record> keyRecordList, boolean prefixMatch, boolean keyFilterOrValueFilter) throws RuntimeException {
        throw new UnsupportedOperationException();
    }

    public List<DuplicateValueRowDataLinkedList> mgetDuplicateValueRowDataLinkedListList(List<Record> keyRecordList, boolean prefixMatch, boolean keyFilterOrValueFilter) throws RuntimeException {
        throw new UnsupportedOperationException();
    }

    public boolean isDone() {
        return true;
    }

    public String toString() {
        return toStringWithInden(0);
    }

    public String toStringWithInden(int inden) {
        String tabTittle = GeneralUtil.getTab(inden);
        String tabContent = GeneralUtil.getTab(inden + 1);
        StringBuilder sb = new StringBuilder();

        GeneralUtil.printlnToStringBuilder(sb, tabTittle + "ConcurrentMergeCursor ");
        GeneralUtil.printAFieldToStringBuilder(sb, "addOrderByItemAndSetNeedBuild", this.orderByList, tabContent);

        for (ISortingCursor cursor : sortingCursorList) {
            sb.append(cursor.toStringWithInden(inden + 1));
        }
        return sb.toString();

    }

    public List<List<OrderBy>> getJoinOrderByListList() throws RuntimeException {
        return Arrays.asList(this.getOrderByList());

    }

    public List<ColumnMetaData> getColumnMetaDataList() throws RuntimeException {
        init();
        return this.sortingCursorList.get(0).getColumnMetaDataList();
    }

}
